繪畫世界

Rust 父进程和 Deno 子进程数据通信

#前言

我有一个需求需要使用 NodeJS 生态的包,但作为 TypeScript 的忠实用户又不想使用 NodeJS,因为开发还需要配置构建工具,尽管这些构建工具非常成熟,于是我将目光投向了 Deno。

Deno 前几年就了解过,但依稀记得当时的体验很差,没想到过几年 2.0 出来后开发体验感觉好很多,还可以直接引入 npm 的包。

决定使用 Deno 开发后,一切感觉都很新颖~~(或许不用 NodeJS 的原因就是想体验新鲜感)~~

说了一些废话后就该思考 Rust 父进程如何和 Deno 子进程进行通信,通信的方式据我个人的了解应该有如下几种

  1. 标准输入输出(stdin/stdout/pipe
  2. Unix domain socket
  3. 共享内存/共享文件
  4. 消息队列
  5. TCP/UDP

考虑应用场景还是使用标准输入输出来进行数据通信,感觉 Unix domain socket 更好,但是 Windows 下 Rust 的并不支持,故还是选择 stdin/stdout

#数据包设计

选择 stdin/stdout 方案有个问题就是普通的日志输出和数据输出会冲突, 一般的解决方案有:

  1. 针对数据输出做标记
  2. 劫持 console,让日志输出也按数据输出
  3. 劫持 console,让日志输出走 stderr 通道

这里我选择的是第一种方案,因此需要设计一个标记用以区分日志输出和数据输出,日志输出一般是 UTF-8 编码,因此单个字节的标记容易误识别,所以选择了三个字节,数据包的设计如下:

[ 识别码: 3u8 ][ 事件码: 1u8 ][ 数据长度: 4u8 ][ 校验和: 4u8 ][ 数据: [u8] ]

头部共 12 个字节,其中识别码采用 [0x10, 0xAA, 0xFF] 这三个值并无含义,可以随机指定。

#Rust 侧实现

由于异步传染,这里采用 tokio

首先定义一个枚举用于表示数据的类型,这里称为事件码

#[repr(u8)]  
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum EventCode{
    Data = 0x01,
    Exit = 0xF1,
    Flush = 0xF2
}

这里总共定义了三个,其含义分别是:

数据包构建

async fn send_binary_packet<W>(
    writer: &mut W,
    event_code: EventCode,
    bytes: &[u8]
) -> anyhow::Result<()>
where
    W: AsyncWrite + Unpin
{
    let mut head = [0; 12];
    head[0..3].copy_from_slice(&[0x10, 0xAA, 0xFF]);
    head[3] = event_code as u8;
    head[4..8].copy_from_slice(&(bytes.len() as u32).to_by_bytes());
    head[8..12].copy_from_slice(&crc32(bytes).to_be_bytes());
    
    let mut bufs: &mut [_] = &mut [IoSlice::new(&head), IoSlice::new(bytes)];
    // writer.write_all_vectored(&mut bufs);
    IoSlice::advance_slices(&mut bufs, 0);
    while !bufs.is_empty() {
        match writer.write_vectored(bufs).await {  
            Ok(0) => anyhow::bail!("..."),  
            Ok(n) => IoSlice::advance_slices(&mut bufs, n),  
            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => (),  
            Err(e) => return Err(e.into()),  
        }
    }
}

启动 Deno 子进程并获取 stdinstdout 管道

use tokio::process::{Child, ChildStdin, ChildStdout, Command};

let mut deno_process = Command::new("deno")
    .arg("run")
    .arg("--allow-all") // deno 需要明确指定需要哪些权限
    .arg("main.ts")
    .stdin(Stdio::piped()) // 创建输入管道
    .stdout(Stdio::stdout()) // 创建输出管道
    .stderr(Stdio::inherit()) // 不处理 stderr, 让其输出到控制台
    .spawn()?;

let deno_stdin = deno_process
    .stdin
    .take()
    .ok_or(anyhow::format_err!(""))?;
let deno_stdout = deno_process
    .stdout.take()
    .ok_or(anyhow::format_err!(""))?;

读取 stdout 管道,由于需要使用切片进行分割,而 tokio 库中并read_until只能使用单个字节作为分隔符而不能使用切片作为分隔符因此需要我们自行实现一个函数,可以参考 read_until 的源码实现一个 read_until_slice

let mut reader = BufReader::new(deno_stdout);
let mut buffer = Vec::new();
loop {
    let bytes_read = reader.read_unitl_slice(&[0x10, 0xAA, 0xFF], &mut buffer).await?;
    if bytes_read == 0 { break; } // EOF
    // 读取剩下的 9 bytes 数据
    {
        let mut tmp = [0; 9];
        reader.read_exact(&mut tmp).await?;
        buffer.extend_from_slice(&tmp)
    }
    let pos = find_subarray(&buffer, &[0x10, 0xAA, 0xFF]);
    // 处理识别码前的数据(即 console 输出)
    if pos > 0 {
        let console_output = &buffer[..pos];
        io::stdout().write_all(console_output).await?;
        buffer.drain(..pos);
    }
    // 处理二进制包(剩余部分)
    let head = &buffer[..12];
    let event_code = head[3];
    let data_len = u32::from_be_bytes((&head[4..8]).try_into()?);
    let checksum = u32::from_be_bytes((&head[8..12]).try_into()?);
    buffer.drain(..12);
    // 忽略该事件
    if event_code == EventCode::Flush as u8 {  
        continue;  
    }
    // 剩下来就从 reader 读取 data_len 字节数的数据了和校验数据完整性了
}

#Deno 侧实现

stdin 中读取数据

EventStream: event-stream.ts

const events = Deno.stdin.readable.pipeThrough(new EventStream());
for await (const [evetCode, data] of events){
    ...
}

发送数据到 stdout

export const sendBinaryPacket = async (
  eventCode: EventCode,
  data: Uint8Array,
) => {
  const head = new Uint8Array(12);
  head.set([0x10, 0xAA, 0xFF], 0);
  const view = new DataView(head.buffer);
  view.setUint8(3, eventCode);
  view.setUint32(4, data.length);
  view.setUint32(8, crc32(data));
  const packet = new Uint8Array(head.length + data.length);
  packet.set(head, 0);
  packet.set(data, head.length);
  try {
    await Deno.stdout.write(packet);
  } catch (e) {
    if (e instanceof Deno.errors.BrokenPipe) {
      logger.error('Stdout pipe unexpectedly closed');
      Deno.exit();
    }
    throw e;
  }
};

#Rust #Deno

笔记

2466 Words